package net.wlm.realtime.funcation;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;

import java.util.HashSet;
import java.util.Set;


public class DimDataFilterFunction extends RichFilterFunction<String> {

    private Set<String> disSet;

    private final Set<String> operateTypeSet =  new HashSet<String>(){{
        add("insert");add("update");add("delete");
        }};

    public DimDataFilterFunction(HashSet<String> dimSet) {
        this.disSet = dimSet;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public boolean filter(String s) throws Exception {
        JSONObject jsonObject = JSON.parseObject(s);
        // 表名称
        String tableName = jsonObject.getString("table_name");
        // 操作类型
        String operateType = jsonObject.getString("operate_type");
        // 判断过滤
        return disSet.contains(tableName) && operateType.contains(operateType);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
